package com.atguigu.bean;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.util.Collections;

/**
 * 去读kafka中calllog主题的数据到hbase
 *
 * Project: DXXM
 * Package: com.atguigu
 * Version: 1.0
 * <p>
 * Created by  WangJX  on 2020/2/20 20:43
 */
public class HBaseConsumer {
    public static void main(String[] args) {
        KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String, String>(PropertiesUtil.properties);
        kafkaConsumer.subscribe(Collections.singletonList(PropertiesUtil.getProperty("kafka.topics")));
        HBaseDAO hBaseDAO = new HBaseDAO();
        while (true) {
            ConsumerRecords<String, String> records = kafkaConsumer.poll(100);
            for (ConsumerRecord cr : records) {
                //打印从kafka中读取到的数据
                System.out.println(cr.value());
                hBaseDAO.put(cr.value().toString());
            }
        }
    }
}
